package com.imooc.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by zghgchao 2017/11/15 13:00
  * 使用Spark Streaming 完成有状态统计
  * updateStateByKey算子
    需求：统计到目前为止累积出现的单词的个数(需要保持住以前的状态)
  */
object StatefulWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
    /**
      * 创建StreamingContext需要两个参数：SparkConf和batch interval
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("172.17.66.107", 9999)

    val result = lines.flatMap(_.split(" ")).map((_, 1))

    val state = result.updateStateByKey[Int](updateFunction _)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 把当前的数据去更新已有的或者是老的数据
    *
    * @param currentValues
    * @param preValues
    * @return
    */
  def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
    // add the new values (currentValues) with the previous running (preValues) count to get the new count
    val current = currentValues.sum
    val pre = preValues.sum
    Some(current + pre)
  }
}
